/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.gcp.bigtable;



import com.google.auto.value.AutoValue;

import com.google.bigtable.v2.Mutation;

import com.google.bigtable.v2.Row;

import com.google.bigtable.v2.RowFilter;

import com.google.bigtable.v2.SampleRowKeysResponse;

import com.google.cloud.bigtable.config.BigtableOptions;

import com.google.protobuf.ByteString;

import com.bff.gaia.unified.sdk.PipelineRunner;

import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.extensions.protobuf.ProtoCoder;

import com.bff.gaia.unified.sdk.io.BoundedSource;

import com.bff.gaia.unified.sdk.io.BoundedSource.BoundedReader;

import com.bff.gaia.unified.sdk.io.range.ByteKey;

import com.bff.gaia.unified.sdk.io.range.ByteKeyRange;

import com.bff.gaia.unified.sdk.io.range.ByteKeyRangeTracker;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.options.ValueProvider;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.ParDo;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PDone;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects.ToStringHelper;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Lists;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import javax.annotation.Nullable;

import java.io.IOException;

import java.util.*;

import java.util.concurrent.ConcurrentLinkedQueue;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.*;



/**

 * {@link PTransform Transforms} for reading from and writing to Google Cloud Bigtable.

 *

 * <p>For more information about Cloud Bigtable, see the online documentation at <a

 * href="https://cloud.google.com/bigtable/">Google Cloud Bigtable</a>.

 *

 * <h3>Reading from Cloud Bigtable</h3>

 *

 * <p>The Bigtable source returns a set of rows from a single table, returning a {@code

 * PCollection<Row>}.

 *

 * <p>To configure a Cloud Bigtable source, you must supply a table id, a project id, an instance id

 * and optionally a {@link BigtableOptions} to provide more specific connection configuration. By

 * default, {@link BigtableIO.Read} will read all rows in the table. The row ranges to be read can

 * optionally be restricted using {@link BigtableIO.Read#withKeyRanges}, and a {@link RowFilter} can

 * be specified using {@link BigtableIO.Read#withRowFilter}. For example:

 *

 * <pre>{@code

 * Pipeline p = ...;

 *

 * // Scan the entire table.

 * p.apply("read",

 *     BigtableIO.read()

 *         .withProjectId(projectId)

 *         .withInstanceId(instanceId)

 *         .withTableId("table"));

 *

 * // Scan a prefix of the table.

 * ByteKeyRange keyRange = ...;

 * p.apply("read",

 *     BigtableIO.read()

 *         .withProjectId(projectId)

 *         .withInstanceId(instanceId)

 *         .withTableId("table")

 *         .withKeyRange(keyRange));

 *

 * // Scan a subset of rows that match the specified row filter.

 * p.apply("filtered read",

 *     BigtableIO.read()

 *         .withProjectId(projectId)

 *         .withInstanceId(instanceId)

 *         .withTableId("table")

 *         .withRowFilter(filter));

 * }</pre>

 *

 * <h3>Writing to Cloud Bigtable</h3>

 *

 * <p>The Bigtable sink executes a set of row mutations on a single table. It takes as input a

 * {@link PCollection PCollection&lt;KV&lt;ByteString, Iterable&lt;Mutation&gt;&gt;&gt;}, where the

 * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an

 * idempotent transformation to that row.

 *

 * <p>To configure a Cloud Bigtable sink, you must supply a table id, a project id, an instance id

 * and optionally a configuration function for {@link BigtableOptions} to provide more specific

 * connection configuration, for example:

 *

 * <pre>{@code

 * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;

 *

 * data.apply("write",

 *     BigtableIO.write()

 *         .withProjectId("project")

 *         .withInstanceId("instance")

 *         .withTableId("table"));

 * }</pre>

 *

 * <h3>Experimental</h3>

 *

 * <p>This connector for Cloud Bigtable is considered experimental and may break or receive

 * backwards-incompatible changes in future versions of the Apache Unified SDK. Cloud Bigtable is in

 * Beta, and thus it may introduce breaking changes in future revisions of its service or APIs.

 *

 * <h3>Permissions</h3>

 *

 * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the

 * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner

 * PipelineRunners} for more details.

 */

@Experimental(Experimental.Kind.SOURCE_SINK)

public class BigtableIO {

  private static final Logger LOG = LoggerFactory.getLogger(BigtableIO.class);



  /**

   * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be

   * initialized with a {@link BigtableIO.Read#withInstanceId} and {@link

   * BigtableIO.Read#withProjectId} that specifies the source Cloud Bigtable instance, and a {@link

   * BigtableIO.Read#withTableId} that specifies which table to read. A {@link RowFilter} may also

   * optionally be specified using {@link BigtableIO.Read#withRowFilter(RowFilter)}.

   */

  @Experimental

  public static Read read() {

    return Read.create();

  }



  /**

   * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be

   * initialized with a {@link BigtableIO.Write#withProjectId} and {@link

   * BigtableIO.Write#withInstanceId} that specifies the destination Cloud Bigtable instance, and a

   * {@link BigtableIO.Write#withTableId} that specifies which table to write.

   */

  @Experimental

  public static Write write() {

    return Write.create();

  }



  /**

   * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on

   * {@link BigtableIO} for more information.

   *

   * @see BigtableIO

   */

  @Experimental(Experimental.Kind.SOURCE_SINK)

  @AutoValue

  public abstract static class Read extends PTransform<PBegin, PCollection<Row>> {



    abstract BigtableConfig getBigtableConfig();



    @Nullable

    abstract RowFilter getRowFilter();



    /** Returns the range of keys that will be read from the table. */

    @Nullable

    public abstract List<ByteKeyRange> getKeyRanges();



    /** Returns the table being read from. */

    @Nullable

    public String getTableId() {

      ValueProvider<String> tableId = getBigtableConfig().getTableId();

      return tableId != null && tableId.isAccessible() ? tableId.get() : null;

    }



    /**

     * Returns the Google Cloud Bigtable instance being read from, and other parameters.

     *

     * @deprecated will be replaced by bigtable options configurator.

     */

    @Deprecated

    @Nullable

    public BigtableOptions getBigtableOptions() {

      return getBigtableConfig().getBigtableOptions();

    }



    abstract Builder toBuilder();



    static Read create() {

      BigtableConfig config =

          BigtableConfig.builder()

              .setTableId(ValueProvider.StaticValueProvider.of(""))

              .setValidate(true)

              .build();



      return new AutoValue_BigtableIO_Read.Builder()

          .setBigtableConfig(config)

          .setKeyRanges(Arrays.asList(ByteKeyRange.ALL_KEYS))

          .build();

    }



    @AutoValue.Builder

    abstract static class Builder {



      abstract Builder setBigtableConfig(BigtableConfig bigtableConfig);



      abstract Builder setRowFilter(RowFilter filter);



      abstract Builder setKeyRanges(List<ByteKeyRange> keyRange);



      abstract Read build();

    }



    /**

     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable project

     * indicated by given parameter, requires {@link #withInstanceId} to be called to determine the

     * instance.

     *

     * <p>Does not modify this object.

     */

    public Read withProjectId(ValueProvider<String> projectId) {

      BigtableConfig config = getBigtableConfig();

      return toBuilder().setBigtableConfig(config.withProjectId(projectId)).build();

    }



    /**

     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable project

     * indicated by given parameter, requires {@link #withInstanceId} to be called to determine the

     * instance.

     *

     * <p>Does not modify this object.

     */

    public Read withProjectId(String projectId) {

      return withProjectId(ValueProvider.StaticValueProvider.of(projectId));

    }



    /**

     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance

     * indicated by given parameter, requires {@link #withProjectId} to be called to determine the

     * project.

     *

     * <p>Does not modify this object.

     */

    public Read withInstanceId(ValueProvider<String> instanceId) {

      BigtableConfig config = getBigtableConfig();

      return toBuilder().setBigtableConfig(config.withInstanceId(instanceId)).build();

    }



    /**

     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance

     * indicated by given parameter, requires {@link #withProjectId} to be called to determine the

     * project.

     *

     * <p>Does not modify this object.

     */

    public Read withInstanceId(String instanceId) {

      return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));

    }



    /**

     * Returns a new {@link BigtableIO.Read} that will read from the specified table.

     *

     * <p>Does not modify this object.

     */

    public Read withTableId(ValueProvider<String> tableId) {

      BigtableConfig config = getBigtableConfig();

      return toBuilder().setBigtableConfig(config.withTableId(tableId)).build();

    }



    /**

     * Returns a new {@link BigtableIO.Read} that will read from the specified table.

     *

     * <p>Does not modify this object.

     */

    public Read withTableId(String tableId) {

      return withTableId(ValueProvider.StaticValueProvider.of(tableId));

    }



    /**

     * WARNING: Should be used only to specify additional parameters for connection to the Cloud

     * Bigtable, instanceId and projectId should be provided over {@link #withInstanceId} and {@link

     * #withProjectId} respectively.

     *

     * <p>Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance

     * indicated by {@link #withProjectId}, and using any other specified customizations.

     *

     * <p>Does not modify this object.

     *

     * @deprecated will be replaced by bigtable options configurator.

     */

    @Deprecated

    public Read withBigtableOptions(BigtableOptions options) {

      checkArgument(options != null, "options can not be null");

      return withBigtableOptions(options.toBuilder());

    }



    /**

     * WARNING: Should be used only to specify additional parameters for connection to the Cloud

     * Bigtable, instanceId and projectId should be provided over {@link #withInstanceId} and {@link

     * #withProjectId} respectively.

     *

     * <p>Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance

     * indicated by the given options, and using any other specified customizations.

     *

     * <p>Clones the given {@link BigtableOptions} builder so that any further changes will have no

     * effect on the returned {@link BigtableIO.Read}.

     *

     * <p>Does not modify this object.

     *

     * @deprecated will be replaced by bigtable options configurator.

     */

    @Deprecated

    public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {

      BigtableConfig config = getBigtableConfig();

      // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.

      return toBuilder()

          .setBigtableConfig(config.withBigtableOptions(optionsBuilder.build().toBuilder().build()))

          .build();

    }



    /**

     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance with

     * customized options provided by given configurator.

     *

     * <p>WARNING: instanceId and projectId should not be provided here and should be provided over

     * {@link #withProjectId} and {@link #withInstanceId}.

     *

     * <p>Does not modify this object.

     */

    public Read withBigtableOptionsConfigurator(

        SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> configurator) {

      BigtableConfig config = getBigtableConfig();

      return toBuilder()

          .setBigtableConfig(config.withBigtableOptionsConfigurator(configurator))

          .build();

    }



    /**

     * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable

     * using the given row filter.

     *

     * <p>Does not modify this object.

     */

    public Read withRowFilter(RowFilter filter) {

      checkArgument(filter != null, "filter can not be null");

      return toBuilder().setRowFilter(filter).build();

    }



    /**

     * Returns a new {@link BigtableIO.Read} that will read only rows in the specified range.

     *

     * <p>Does not modify this object.

     */

    public Read withKeyRange(ByteKeyRange keyRange) {

      checkArgument(keyRange != null, "keyRange can not be null");

      return toBuilder().setKeyRanges(Arrays.asList(keyRange)).build();

    }



    /**

     * Returns a new {@link BigtableIO.Read} that will read only rows in the specified ranges.

     * Ranges must not overlap.

     *

     * <p>Does not modify this object.

     */

    public Read withKeyRanges(List<ByteKeyRange> keyRanges) {

      checkArgument(keyRanges != null, "keyRanges can not be null");

      checkArgument(!keyRanges.isEmpty(), "keyRanges can not be empty");

      for (ByteKeyRange range : keyRanges) {

        checkArgument(range != null, "keyRanges cannot hold null range");

      }

      return toBuilder().setKeyRanges(keyRanges).build();

    }



    /** Disables validation that the table being read from exists. */

    public Read withoutValidation() {

      BigtableConfig config = getBigtableConfig();

      return toBuilder().setBigtableConfig(config.withValidate(false)).build();

    }



    /**

     * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable service

     * implementation.

     *

     * <p>This is used for testing.

     *

     * <p>Does not modify this object.

     */

    @VisibleForTesting

    Read withBigtableService(BigtableService bigtableService) {

      BigtableConfig config = getBigtableConfig();

      return toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();

    }



    @Override

    public PCollection<Row> expand(PBegin input) {

      getBigtableConfig().validate();



      BigtableSource source =

          new BigtableSource(getBigtableConfig(), getRowFilter(), getKeyRanges(), null);

      return input.getPipeline().apply(com.bff.gaia.unified.sdk.io.Read.from(source));

    }



    @Override

    public void validate(PipelineOptions options) {

      validateTableExists(getBigtableConfig(), options);

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      getBigtableConfig().populateDisplayData(builder);



      List<ByteKeyRange> keyRanges = getKeyRanges();

      for (int i = 0; i < keyRanges.size() && i < 5; i++) {

        builder.addIfNotDefault(

            DisplayData.item("keyRange " + i, keyRanges.get(i).toString()),

            ByteKeyRange.ALL_KEYS.toString());

      }



      if (getRowFilter() != null) {

        builder.add(

            DisplayData.item("rowFilter", getRowFilter().toString()).withLabel("Table Row Filter"));

      }

    }



    @Override

    public String toString() {

      ToStringHelper helper =

          MoreObjects.toStringHelper(Read.class).add("config", getBigtableConfig());

      for (int i = 0; i < getKeyRanges().size(); i++) {

        helper.add("keyRange " + i, getKeyRanges().get(i));

      }

      return helper.add("filter", getRowFilter()).toString();

    }

  }



  /**

   * A {@link PTransform} that writes to Google Cloud Bigtable. See the class-level Javadoc on

   * {@link BigtableIO} for more information.

   *

   * @see BigtableIO

   */

  @Experimental(Experimental.Kind.SOURCE_SINK)

  @AutoValue

  public abstract static class Write

      extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {



    static SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder>

        enableBulkApiConfigurator(

            final @Nullable SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder>

                    userConfigurator) {

      return optionsBuilder -> {

        if (userConfigurator != null) {

          optionsBuilder = userConfigurator.apply(optionsBuilder);

        }



        return optionsBuilder.setBulkOptions(

            optionsBuilder.build().getBulkOptions().toBuilder().setUseBulkApi(true).build());

      };

    }



    abstract BigtableConfig getBigtableConfig();



    /**

     * Returns the Google Cloud Bigtable instance being written to, and other parameters.

     *

     * @deprecated will be replaced by bigtable options configurator.

     */

    @Deprecated

    @Nullable

    public BigtableOptions getBigtableOptions() {

      return getBigtableConfig().getBigtableOptions();

    }



    abstract Builder toBuilder();



    static Write create() {

      BigtableConfig config =

          BigtableConfig.builder()

              .setTableId(ValueProvider.StaticValueProvider.of(""))

              .setValidate(true)

              .setBigtableOptionsConfigurator(enableBulkApiConfigurator(null))

              .build();



      return new AutoValue_BigtableIO_Write.Builder().setBigtableConfig(config).build();

    }



    @AutoValue.Builder

    abstract static class Builder {



      abstract Builder setBigtableConfig(BigtableConfig bigtableConfig);



      abstract Write build();

    }



    /**

     * Returns a new {@link BigtableIO.Write} that will write into the Cloud Bigtable project

     * indicated by given parameter, requires {@link #withInstanceId} to be called to determine the

     * instance.

     *

     * <p>Does not modify this object.

     */

    public Write withProjectId(ValueProvider<String> projectId) {

      BigtableConfig config = getBigtableConfig();

      return toBuilder().setBigtableConfig(config.withProjectId(projectId)).build();

    }



    /**

     * Returns a new {@link BigtableIO.Write} that will write into the Cloud Bigtable project

     * indicated by given parameter, requires {@link #withInstanceId} to be called to determine the

     * instance.

     *

     * <p>Does not modify this object.

     */

    public Write withProjectId(String projectId) {

      return withProjectId(ValueProvider.StaticValueProvider.of(projectId));

    }



    /**

     * Returns a new {@link BigtableIO.Write} that will write into the Cloud Bigtable instance

     * indicated by given parameter, requires {@link #withProjectId} to be called to determine the

     * project.

     *

     * <p>Does not modify this object.

     */

    public Write withInstanceId(ValueProvider<String> instanceId) {

      BigtableConfig config = getBigtableConfig();

      return toBuilder().setBigtableConfig(config.withInstanceId(instanceId)).build();

    }



    /**

     * Returns a new {@link BigtableIO.Write} that will write into the Cloud Bigtable instance

     * indicated by given parameter, requires {@link #withProjectId} to be called to determine the

     * project.

     *

     * <p>Does not modify this object.

     */

    public Write withInstanceId(String instanceId) {

      return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));

    }



    /**

     * Returns a new {@link BigtableIO.Write} that will write to the specified table.

     *

     * <p>Does not modify this object.

     */

    public Write withTableId(ValueProvider<String> tableId) {

      BigtableConfig config = getBigtableConfig();

      return toBuilder().setBigtableConfig(config.withTableId(tableId)).build();

    }



    /**

     * Returns a new {@link BigtableIO.Write} that will write to the specified table.

     *

     * <p>Does not modify this object.

     */

    public Write withTableId(String tableId) {

      return withTableId(ValueProvider.StaticValueProvider.of(tableId));

    }



    /**

     * WARNING: Should be used only to specify additional parameters for connection to the Cloud

     * Bigtable, instanceId and projectId should be provided over {@link #withInstanceId} and {@link

     * #withProjectId} respectively.

     *

     * <p>Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable instance

     * indicated by the given options, and using any other specified customizations.

     *

     * <p>Does not modify this object.

     *

     * @deprecated will be replaced by bigtable options configurator.

     */

    @Deprecated

    public Write withBigtableOptions(BigtableOptions options) {

      checkArgument(options != null, "options can not be null");

      return withBigtableOptions(options.toBuilder());

    }



    /**

     * WARNING: Should be used only to specify additional parameters for connection to the Cloud

     * Bigtable, instanceId and projectId should be provided over {@link #withInstanceId} and {@link

     * #withProjectId} respectively.

     *

     * <p>Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable instance

     * indicated by the given options, and using any other specified customizations.

     *

     * <p>Clones the given {@link BigtableOptions} builder so that any further changes will have no

     * effect on the returned {@link BigtableIO.Write}.

     *

     * <p>Does not modify this object.

     *

     * @deprecated will be replaced by bigtable options configurator.

     */

    @Deprecated

    public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {

      BigtableConfig config = getBigtableConfig();

      // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.

      return toBuilder()

          .setBigtableConfig(config.withBigtableOptions(optionsBuilder.build().toBuilder().build()))

          .build();

    }



    /**

     * Returns a new {@link BigtableIO.Write} that will read from the Cloud Bigtable instance with

     * customized options provided by given configurator.

     *

     * <p>WARNING: instanceId and projectId should not be provided here and should be provided over

     * {@link #withProjectId} and {@link #withInstanceId}.

     *

     * <p>Does not modify this object.

     */

    public Write withBigtableOptionsConfigurator(

        SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> configurator) {

      BigtableConfig config = getBigtableConfig();

      return toBuilder()

          .setBigtableConfig(

              config.withBigtableOptionsConfigurator(enableBulkApiConfigurator(configurator)))

          .build();

    }



    /** Disables validation that the table being written to exists. */

    public Write withoutValidation() {

      BigtableConfig config = getBigtableConfig();

      return toBuilder().setBigtableConfig(config.withValidate(false)).build();

    }



    /**

     * Returns a new {@link BigtableIO.Write} that will write using the given Cloud Bigtable service

     * implementation.

     *

     * <p>This is used for testing.

     *

     * <p>Does not modify this object.

     */

    Write withBigtableService(BigtableService bigtableService) {

      BigtableConfig config = getBigtableConfig();

      return toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();

    }



    @Override

    public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input) {

      getBigtableConfig().validate();



      input.apply(ParDo.of(new BigtableWriterFn(getBigtableConfig())));

      return PDone.in(input.getPipeline());

    }



    @Override

    public void validate(PipelineOptions options) {

      validateTableExists(getBigtableConfig(), options);

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      getBigtableConfig().populateDisplayData(builder);

    }



    @Override

    public String toString() {

      return MoreObjects.toStringHelper(Write.class).add("config", getBigtableConfig()).toString();

    }



    private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> {



      public BigtableWriterFn(BigtableConfig bigtableConfig) {

        this.config = bigtableConfig;

        this.failures = new ConcurrentLinkedQueue<>();

      }



      @StartBundle

      public void startBundle(DoFn.StartBundleContext c) throws IOException {

        if (bigtableWriter == null) {

          bigtableWriter =

              config

                  .getBigtableService(c.getPipelineOptions())

                  .openForWriting(config.getTableId().get());

        }

        recordsWritten = 0;

      }



      @ProcessElement

      public void processElement(ProcessContext c) throws Exception {

        checkForFailures();

        bigtableWriter

            .writeRecord(c.element())

            .whenComplete(

                (mutationResult, exception) -> {

                  if (exception != null) {

                    failures.add(new BigtableWriteException(c.element(), exception));

                  }

                });

        ++recordsWritten;

      }



      @FinishBundle

      public void finishBundle() throws Exception {

        bigtableWriter.flush();

        checkForFailures();

        LOG.debug("Wrote {} records", recordsWritten);

      }



      @Teardown

      public void tearDown() throws Exception {

        if (bigtableWriter != null) {

          bigtableWriter.close();

          bigtableWriter = null;

        }

      }



      @Override

      public void populateDisplayData(DisplayData.Builder builder) {

        builder.delegate(Write.this);

      }



      ///////////////////////////////////////////////////////////////////////////////

      private final BigtableConfig config;

      private BigtableService.Writer bigtableWriter;

      private long recordsWritten;

      private final ConcurrentLinkedQueue<BigtableWriteException> failures;



      /** If any write has asynchronously failed, fail the bundle with a useful error. */

      private void checkForFailures() throws IOException {

        // Note that this function is never called by multiple threads and is the only place that

        // we remove from failures, so this code is safe.

        if (failures.isEmpty()) {

          return;

        }



        StringBuilder logEntry = new StringBuilder();

        int i = 0;

        List<BigtableWriteException> suppressed = Lists.newArrayList();

        for (; i < 10 && !failures.isEmpty(); ++i) {

          BigtableWriteException exc = failures.remove();

          logEntry.append("\n").append(exc.getMessage());

          if (exc.getCause() != null) {

            logEntry.append(": ").append(exc.getCause().getMessage());

          }

          suppressed.add(exc);

        }

        String message =

            String.format(

                "At least %d errors occurred writing to Bigtable. First %d errors: %s",

                i + failures.size(), i, logEntry.toString());

        LOG.error(message);

        IOException exception = new IOException(message);

        for (BigtableWriteException e : suppressed) {

          exception.addSuppressed(e);

        }

        throw exception;

      }

    }

  }



  //////////////////////////////////////////////////////////////////////////////////////////

  /** Disallow construction of utility class. */

  private BigtableIO() {}



  private static ByteKey makeByteKey(ByteString key) {

    return ByteKey.copyFrom(key.asReadOnlyByteBuffer());

  }



  static class BigtableSource extends BoundedSource<Row> {

    public BigtableSource(

        BigtableConfig config,

        @Nullable RowFilter filter,

        List<ByteKeyRange> ranges,

        @Nullable Long estimatedSizeBytes) {

      this.config = config;

      this.filter = filter;

      this.ranges = ranges;

      this.estimatedSizeBytes = estimatedSizeBytes;

    }



    @Override

    public String toString() {

      return MoreObjects.toStringHelper(BigtableSource.class)

          .add("config", config)

          .add("filter", filter)

          .add("ranges", ranges)

          .add("estimatedSizeBytes", estimatedSizeBytes)

          .toString();

    }



    ////// Private state and internal implementation details //////

    private final BigtableConfig config;

    @Nullable private final RowFilter filter;

    private final List<ByteKeyRange> ranges;

    @Nullable private Long estimatedSizeBytes;

    @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys;



    /** Creates a new {@link BigtableSource} with just one {@link ByteKeyRange}. */

    protected BigtableSource withSingleRange(ByteKeyRange range) {

      checkArgument(range != null, "range can not be null");

      return new BigtableSource(config, filter, Arrays.asList(range), estimatedSizeBytes);

    }



    protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {

      checkArgument(estimatedSizeBytes != null, "estimatedSizeBytes can not be null");

      return new BigtableSource(config, filter, ranges, estimatedSizeBytes);

    }



    /**

     * Makes an API call to the Cloud Bigtable service that gives information about tablet key

     * boundaries and estimated sizes. We can use these samples to ensure that splits are on

     * different tablets, and possibly generate sub-splits within tablets.

     */

    private List<SampleRowKeysResponse> getSampleRowKeys(PipelineOptions pipelineOptions)

        throws IOException {

      return config.getBigtableService(pipelineOptions).getSampleRowKeys(this);

    }



    private static final long MAX_SPLIT_COUNT = 15_360L;



    @Override

    public List<BigtableSource> split(long desiredBundleSizeBytes, PipelineOptions options)

        throws Exception {

      // Update the desiredBundleSizeBytes in order to limit the

      // number of splits to maximumNumberOfSplits.

      long maximumNumberOfSplits = 4000;

      long sizeEstimate = getEstimatedSizeBytes(options);

      desiredBundleSizeBytes =

          Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);



      // Delegate to testable helper.

      List<BigtableSource> splits =

          splitBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options));



      // Reduce the splits.

      List<BigtableSource> reduced = reduceSplits(splits, options, MAX_SPLIT_COUNT);

      // Randomize the result before returning an immutable copy of the splits, the default behavior

      // may lead to multiple workers hitting the same tablet.

      Collections.shuffle(reduced);

      return ImmutableList.copyOf(reduced);

    }



    /** Returns a mutable list of reduced splits. */

    @VisibleForTesting

    protected List<BigtableSource> reduceSplits(

		List<BigtableSource> splits, PipelineOptions options, long maxSplitCounts)

        throws IOException {

      int numberToCombine = (int) ((splits.size() + maxSplitCounts - 1) / maxSplitCounts);

      if (splits.size() < maxSplitCounts || numberToCombine < 2) {

        return new ArrayList<>(splits);

      }

      List<BigtableSource> reducedSplits = new ArrayList<>();

      List<ByteKeyRange> previousSourceRanges = new ArrayList<>();

      int counter = 0;

      long size = 0;

      for (BigtableSource source : splits) {

        if (counter == numberToCombine

            || !checkRangeAdjacency(previousSourceRanges, source.getRanges())) {

          reducedSplits.add(new BigtableSource(config, filter, previousSourceRanges, size));

          counter = 0;

          size = 0;

          previousSourceRanges = new ArrayList<>();

        }

        previousSourceRanges.addAll(source.getRanges());

        previousSourceRanges = mergeRanges(previousSourceRanges);

        size += source.getEstimatedSizeBytes(options);

        counter++;

      }

      if (size > 0) {

        reducedSplits.add(new BigtableSource(config, filter, previousSourceRanges, size));

      }

      return reducedSplits;

    }



    /**

     * Helper to validate range Adjacency. Ranges are considered adjacent if

     * [1..100][100..200][200..300]

     */

    private static boolean checkRangeAdjacency(

		List<ByteKeyRange> ranges, List<ByteKeyRange> otherRanges) {

      checkArgument(ranges != null || otherRanges != null, "Both ranges cannot be null.");

      ImmutableList.Builder<ByteKeyRange> mergedRanges = ImmutableList.builder();

      if (ranges != null) {

        mergedRanges.addAll(ranges);

      }

      if (otherRanges != null) {

        mergedRanges.addAll(otherRanges);

      }

      return checkRangeAdjacency(mergedRanges.build());

    }



    /**

     * Helper to validate range Adjacency. Ranges are considered adjacent if

     * [1..100][100..200][200..300]

     */

    private static boolean checkRangeAdjacency(List<ByteKeyRange> ranges) {

      int index = 0;

      if (ranges.size() < 2) {

        return true;

      }

      ByteKey lastEndKey = ranges.get(index++).getEndKey();

      while (index < ranges.size()) {

        ByteKeyRange currentKeyRange = ranges.get(index++);

        if (!lastEndKey.equals(currentKeyRange.getStartKey())) {

          return false;

        }

        lastEndKey = currentKeyRange.getEndKey();

      }

      return true;

    }



    /**

     * Helper to combine/merge ByteKeyRange Ranges should only be merged if they are adjacent ex.

     * [1..100][100..200][200..300] will result in [1..300] Note: this method will not check for

     * adjacency see {@link #checkRangeAdjacency(List)}

     */

    private static List<ByteKeyRange> mergeRanges(List<ByteKeyRange> ranges) {

      List<ByteKeyRange> response = new ArrayList<>();

      if (ranges.size() < 2) {

        response.add(ranges.get(0));

      } else {

        response.add(

            ByteKeyRange.of(

                ranges.get(0).getStartKey(), ranges.get(ranges.size() - 1).getEndKey()));

      }

      return response;

    }



    /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */

    private List<BigtableSource> splitBasedOnSamples(

        long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {

      // There are no regions, or no samples available. Just scan the entire range.

      if (sampleRowKeys.isEmpty()) {

        LOG.info("Not splitting source {} because no sample row keys are available.", this);

        return Collections.singletonList(this);

      }

      LOG.info(

          "About to split into bundles of size {} with sampleRowKeys length {} first element {}",

          desiredBundleSizeBytes,

          sampleRowKeys.size(),

          sampleRowKeys.get(0));



      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();

      for (ByteKeyRange range : ranges) {

        splits.addAll(splitRangeBasedOnSamples(desiredBundleSizeBytes, sampleRowKeys, range));

      }

      return splits.build();

    }



    /**

     * Helper that splits a {@code ByteKeyRange} into bundles based on Cloud Bigtable sampled row

     * keys.

     */

    private List<BigtableSource> splitRangeBasedOnSamples(

        long desiredBundleSizeBytes,

        List<SampleRowKeysResponse> sampleRowKeys,

        ByteKeyRange range) {



      // Loop through all sampled responses and generate splits from the ones that overlap the

      // scan range. The main complication is that we must track the end range of the previous

      // sample to generate good ranges.

      ByteKey lastEndKey = ByteKey.EMPTY;

      long lastOffset = 0;

      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();

      for (SampleRowKeysResponse response : sampleRowKeys) {

        ByteKey responseEndKey = makeByteKey(response.getRowKey());

        long responseOffset = response.getOffsetBytes();

        checkState(

            responseOffset >= lastOffset,

            "Expected response byte offset %s to come after the last offset %s",

            responseOffset,

            lastOffset);



        if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) {

          // This region does not overlap the scan, so skip it.

          lastOffset = responseOffset;

          lastEndKey = responseEndKey;

          continue;

        }



        // Calculate the beginning of the split as the larger of startKey and the end of the last

        // split. Unspecified start is smallest key so is correctly treated as earliest key.

        ByteKey splitStartKey = lastEndKey;

        if (splitStartKey.compareTo(range.getStartKey()) < 0) {

          splitStartKey = range.getStartKey();

        }



        // Calculate the end of the split as the smaller of endKey and the end of this sample. Note

        // that range.containsKey handles the case when range.getEndKey() is empty.

        ByteKey splitEndKey = responseEndKey;

        if (!range.containsKey(splitEndKey)) {

          splitEndKey = range.getEndKey();

        }



        // We know this region overlaps the desired key range, and we know a rough estimate of its

        // size. Split the key range into bundle-sized chunks and then add them all as splits.

        long sampleSizeBytes = responseOffset - lastOffset;

        List<BigtableSource> subSplits =

            splitKeyRangeIntoBundleSizedSubranges(

                sampleSizeBytes,

                desiredBundleSizeBytes,

                ByteKeyRange.of(splitStartKey, splitEndKey));

        splits.addAll(subSplits);



        // Move to the next region.

        lastEndKey = responseEndKey;

        lastOffset = responseOffset;

      }



      // We must add one more region after the end of the samples if both these conditions hold:

      //  1. we did not scan to the end yet (lastEndKey is concrete, not 0-length).

      //  2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey).

      if (!lastEndKey.isEmpty()

          && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) {

        splits.add(this.withSingleRange(ByteKeyRange.of(lastEndKey, range.getEndKey())));

      }



      List<BigtableSource> ret = splits.build();

      LOG.info("Generated {} splits. First split: {}", ret.size(), ret.get(0));

      return ret;

    }



    @Override

    public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {

      // Delegate to testable helper.

      if (estimatedSizeBytes == null) {

        estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys(options));

      }

      return estimatedSizeBytes;

    }



    /**

     * Computes the estimated size in bytes based on the total size of all samples that overlap the

     * key ranges this source will scan.

     */

    private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) {

      long estimatedSizeBytes = 0;

      long lastOffset = 0;

      ByteKey currentStartKey = ByteKey.EMPTY;

      // Compute the total estimated size as the size of each sample that overlaps the scan range.

      // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a

      // filter or to sample on a given key range.

      for (SampleRowKeysResponse response : samples) {

        ByteKey currentEndKey = makeByteKey(response.getRowKey());

        long currentOffset = response.getOffsetBytes();

        if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {

          // Skip an empty region.

          lastOffset = currentOffset;

          continue;

        } else {

          for (ByteKeyRange range : ranges) {

            if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) {

              estimatedSizeBytes += currentOffset - lastOffset;

              // We don't want to double our estimated size if two ranges overlap this sample

              // region, so exit early.

              break;

            }

          }

        }

        currentStartKey = currentEndKey;

        lastOffset = currentOffset;

      }

      return estimatedSizeBytes;

    }



    @Override

    public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {

      return new BigtableReader(this, config.getBigtableService(options));

    }



    @Override

    public void validate() {

      if (!config.getValidate()) {

        LOG.debug("Validation is disabled");

        return;

      }



      ValueProvider<String> tableId = config.getTableId();

      checkArgument(

          tableId != null && tableId.isAccessible() && !tableId.get().isEmpty(),

          "tableId was not supplied");

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);



      builder.add(DisplayData.item("tableId", config.getTableId()).withLabel("Table ID"));



      if (filter != null) {

        builder.add(DisplayData.item("rowFilter", filter.toString()).withLabel("Table Row Filter"));

      }

    }



    @Override

    public Coder<Row> getOutputCoder() {

      return ProtoCoder.of(Row.class);

    }



    /** Helper that splits the specified range in this source into bundles. */

    private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges(

        long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) {

      // Catch the trivial cases. Split is small enough already, or this is the last region.

      LOG.debug(

          "Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}",

          sampleSizeBytes,

          desiredBundleSizeBytes);

      if (sampleSizeBytes <= desiredBundleSizeBytes) {

        return Collections.singletonList(

            this.withSingleRange(ByteKeyRange.of(range.getStartKey(), range.getEndKey())));

      }



      checkArgument(

          sampleSizeBytes > 0, "Sample size %s bytes must be greater than 0.", sampleSizeBytes);

      checkArgument(

          desiredBundleSizeBytes > 0,

          "Desired bundle size %s bytes must be greater than 0.",

          desiredBundleSizeBytes);



      int splitCount = (int) Math.ceil(((double) sampleSizeBytes) / desiredBundleSizeBytes);

      List<ByteKey> splitKeys = range.split(splitCount);

      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();

      Iterator<ByteKey> keys = splitKeys.iterator();

      ByteKey prev = keys.next();

      while (keys.hasNext()) {

        ByteKey next = keys.next();

        splits.add(

            this.withSingleRange(ByteKeyRange.of(prev, next))

                .withEstimatedSizeBytes(sampleSizeBytes / splitCount));

        prev = next;

      }

      return splits.build();

    }



    public List<ByteKeyRange> getRanges() {

      return ranges;

    }



    public RowFilter getRowFilter() {

      return filter;

    }



    public ValueProvider<String> getTableId() {

      return config.getTableId();

    }

  }



  private static class BigtableReader extends BoundedReader<Row> {

    // Thread-safety: source is protected via synchronization and is only accessed or modified

    // inside a synchronized block (or constructor, which is the same).

    private BigtableSource source;

    private BigtableService service;

    private BigtableService.Reader reader;

    private final ByteKeyRangeTracker rangeTracker;

    private long recordsReturned;



    public BigtableReader(BigtableSource source, BigtableService service) {

      checkArgument(source.getRanges().size() == 1, "source must have exactly one key range");

      this.source = source;

      this.service = service;

      rangeTracker = ByteKeyRangeTracker.of(source.getRanges().get(0));

    }



    @Override

    public boolean start() throws IOException {

      reader = service.createReader(getCurrentSource());

      boolean hasRecord =

          (reader.start()

                  && rangeTracker.tryReturnRecordAt(

                      true, makeByteKey(reader.getCurrentRow().getKey())))

              || rangeTracker.markDone();

      if (hasRecord) {

        ++recordsReturned;

      }

      return hasRecord;

    }



    @Override

    public synchronized BigtableSource getCurrentSource() {

      return source;

    }



    @Override

    public boolean advance() throws IOException {

      boolean hasRecord =

          (reader.advance()

                  && rangeTracker.tryReturnRecordAt(

                      true, makeByteKey(reader.getCurrentRow().getKey())))

              || rangeTracker.markDone();

      if (hasRecord) {

        ++recordsReturned;

      }

      return hasRecord;

    }



    @Override

    public Row getCurrent() throws NoSuchElementException {

      return reader.getCurrentRow();

    }



    @Override

    public void close() throws IOException {

      LOG.info("Closing reader after reading {} records.", recordsReturned);

      if (reader != null) {

        reader.close();

        reader = null;

      }

    }



    @Override

    public final Double getFractionConsumed() {

      return rangeTracker.getFractionConsumed();

    }



    @Override

    public final long getSplitPointsConsumed() {

      return rangeTracker.getSplitPointsConsumed();

    }



    @Override

    @Nullable

    public final synchronized BigtableSource splitAtFraction(double fraction) {

      ByteKey splitKey;

      ByteKeyRange range = rangeTracker.getRange();

      try {

        splitKey = range.interpolateKey(fraction);

      } catch (RuntimeException e) {

        LOG.info("{}: Failed to interpolate key for fraction {}.", range, fraction, e);

        return null;

      }

      LOG.info("Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);

      BigtableSource primary;

      BigtableSource residual;

      try {

        primary = source.withSingleRange(ByteKeyRange.of(range.getStartKey(), splitKey));

        residual = source.withSingleRange(ByteKeyRange.of(splitKey, range.getEndKey()));

      } catch (RuntimeException e) {

        LOG.info(

            "{}: Interpolating for fraction {} yielded invalid split key {}.",

            rangeTracker.getRange(),

            fraction,

            splitKey,

            e);

        return null;

      }

      if (!rangeTracker.trySplitAtPosition(splitKey)) {

        return null;

      }

      this.source = primary;

      return residual;

    }

  }



  /** An exception that puts information about the failed record being written in its message. */

  static class BigtableWriteException extends IOException {

    public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {

      super(

          String.format(

              "Error mutating row %s with mutations %s",

              record.getKey().toStringUtf8(), record.getValue()),

          cause);

    }

  }



  static void validateTableExists(BigtableConfig config, PipelineOptions options) {

    if (config.getValidate() && config.isDataAccessible()) {

      String tableId = checkNotNull(config.getTableId().get());

      try {

        checkArgument(

            config.getBigtableService(options).tableExists(tableId),

            "Table %s does not exist",

            tableId);

      } catch (IOException e) {

        LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);

      }

    }

  }

}